package org.luosl.webmagicx.pipeline

import java.io.Closeable

import org.luosl.webmagicx.conf.{SpiderConf, XmlProps}
import org.luosl.webmagicx.utils.Logging
import org.luosl.webmagicx.listeners.PipelineListener
import org.luosl.webmagicx.pipeline.component.Distinct
import us.codecraft.webmagic.{ResultItems, Task}
import us.codecraft.webmagic.pipeline.Pipeline

import scala.collection.mutable.ArrayBuffer

/**
  * Created by luosl on 2017/11/7.
  */
abstract class AbstractPipeline(sc:SpiderConf, task:Task, props:XmlProps) extends Pipeline with Logging with Closeable{

  /**
    * 监听器
    */
  protected var listeners:ArrayBuffer[PipelineListener] = ArrayBuffer.empty[PipelineListener]

  protected val allFields:List[String] = sc.fields.map(_.name).toList ::: List("_url", "_page")

  /**
    * 添加一个监听器
    * @param listener listener
    * @return
    */
  def addListener(listener:PipelineListener):AbstractPipeline = {
    listeners += listener
    this
  }

  def onSkip(resultItems: ResultItems, task: Task): Unit ={
    listeners.foreach(ls=>ls.onSkip(resultItems, task))
  }

  def onError(resultItems: ResultItems, task: Task): Unit ={
    listeners.foreach(ls=>ls.onError(resultItems, task))
  }

  def onSuccess(resultItems: ResultItems, task: Task): Unit ={
    listeners.foreach(ls=>ls.onSuccess(resultItems, task))
  }

  /**
    * 去重器
    * @return
    */
  def distinctOpt():Option[Distinct] = None

  /**
    * 保存操作
    * @param resultItems resultItems
    * @param task task
    */
  def save(resultItems: ResultItems, task: Task): Unit

  def process(resultItems: ResultItems, task: Task):Unit = {
    try {
      // 判断是否去重
      if(distinctOpt().isDefined){
        if(distinctOpt().get.isUniqueAndAdd(resultItems)){
          save(resultItems, task)
          onSuccess(resultItems, task)
        }else{
          onSkip(resultItems, task)
          logInfo(s"[url=${resultItems.get("_url")}]已经存在,跳过save阶段...")
        }
      }else{
        save(resultItems, task)
        logInfo(s"保存[url=${resultItems.get("_url")}]成功.....")
        onSuccess(resultItems, task)
      }
    }catch {
      case e:Exception =>
        logError(s"保存[url=${resultItems.get("_url")}]失败.....")
        onError(resultItems, task)
        throw e
    }

  }

  override def close(): Unit ={
  }

}
